use crate::handler::MessageHandler;
use crate::message::{CallReturn, Message};
use alloc::boxed::Box;
use alloc::collections::BTreeMap;
use alloc::format;
use alloc::string::{String, ToString};
use alloc::sync::{Arc, Weak};
use alloc::vec::Vec;
use core::sync::atomic::{AtomicBool, AtomicU32, Ordering};
use core::time::Duration;
use embed_std::sync::channel::{sync_channel, Receiver, SyncSender};
use embed_std::sync::mutex::Mutex;
use embed_std::{debugln, errorln, infoln, thread, warnln};

const QUEUE_SIZE: usize = 100000;

struct MessageBusItem {
    pub owned: Option<Arc<Box<dyn MessageHandler>>>,
    pub borrowed: Option<Weak<Box<dyn MessageHandler>>>,
}

pub struct MessageBusInner {
    handlers: Mutex<BTreeMap<String, BTreeMap<String, MessageBusItem>>>,
    is_quit: AtomicBool,
    tx: SyncSender<Message>,
    token_seq: AtomicU32,
}

pub struct MessageBus(Arc<MessageBusInner>);

impl Clone for MessageBus {
    fn clone(&self) -> Self {
        MessageBus(self.0.clone())
    }
}

impl MessageBus {
    pub fn new(size: usize) -> MessageBus {
        let (tx, rx) = sync_channel::<Message>(size);
        let inner = MessageBusInner {
            handlers: Mutex::new(BTreeMap::new()),
            is_quit: AtomicBool::new(false),
            tx,
            token_seq: AtomicU32::new(1),
        };
        let my = MessageBus(Arc::new(inner));
        my.init(rx);
        my
    }

    fn init(&self, rx: Receiver<Message>) {
        let inner = self.0.clone();
        thread::spawn(move || {
            infoln!("MessageBus>>>");
            let mut count = 0u64;
            while !inner.is_quit.load(Ordering::SeqCst) {
                if let Ok(msg) = rx.recv(200) {
                    count += 1;
                    debugln!("#{} received topic {}", count, msg.topic());
                    let handlers = inner.handlers.lock().unwrap();
                    if let Some(l) = handlers.get(msg.topic()) {
                        for handler in l.values() {
                            let hdr = if let Some(ref hdr) = handler.owned {
                                Some(hdr.clone())
                            } else if let Some(ref hdr) = handler.borrowed {
                                if let Some(h) = Weak::upgrade(hdr) {
                                    Some(h.clone())
                                } else {
                                    None
                                }
                            } else {
                                None
                            };
                            hdr.map(|h| {
                                if !msg.handle_call_data(
                                    |topic, method, data| {
                                        let res = h.call(topic, method, data);
                                        if let Err(ref e) = res {
                                            errorln!(
                                                "topic {} call method {} failed {}",
                                                topic,
                                                method,
                                                e
                                            );
                                        }
                                        res
                                    },
                                    true,
                                ) {
                                    h.handle(&msg);
                                }
                            });
                        }
                    } else {
                        debugln!("topic {} not handler", msg.topic());
                    }
                }
            }
            infoln!("MessageBus<<<");
        });
    }

    /// owned_handler or borrowed_handler MUST not NONE at the same time
    /// @return token that used to unsubscribe_by_token
    pub fn subscribe(
        &self,
        topic: &str,
        owned_handler: Option<Box<dyn MessageHandler>>,
        borrowed_handler: Option<Weak<Box<dyn MessageHandler>>>,
    ) -> String {
        debugln!("subscribe {}", topic);
        assert!(owned_handler.is_some() || borrowed_handler.is_some());
        let seq = self.gen_seq();
        let token = Self::make_token(topic, seq.as_str());
        let mut handlers = self.0.handlers.lock().unwrap();
        let owned = if owned_handler.is_some() {
            Some(Arc::new(owned_handler.unwrap()))
        } else {
            None
        };
        let item = MessageBusItem {
            owned,
            borrowed: borrowed_handler,
        };
        if let Some(l) = handlers.get_mut(topic) {
            l.insert(seq, item);
        } else {
            let mut map = BTreeMap::new();
            map.insert(seq, item);
            handlers.insert(topic.to_string(), map);
        }
        debugln!("subscribe return token {}", token);
        token
    }

    pub fn call(&self, topic: &str, method: &str, args: &[u8], to_ms: u32) -> CallReturn {
        let (msg, rx) = Message::new_call(topic, method, args);
        match self.0.tx.send(msg) {
            Ok(_) => match rx.recv(to_ms) {
                Ok(data) => {
                    let res = data;
                    res
                }
                Err(e) => Err(format!("{}", e)),
            },
            Err(e) => {
                errorln!("send failed: {}", e);
                Err(format!("{}", e))
            }
        }
    }

    pub fn unsubscribe_topic(&self, topic: &str) -> bool {
        debugln!("unsubscribe topic {}", topic);
        let mut handlers = self.0.handlers.lock().unwrap();
        let v = handlers.remove(&topic.to_string());
        v.is_some()
    }

    pub fn unsubscribe(&self, token: &str) -> bool {
        debugln!("unsubscribe token {}", token);
        let items = Self::parse_token(token);
        if items.is_none() {
            errorln!("token {} invalid", token);
            return false;
        }
        let (seq, topic) = items.unwrap();
        let mut handlers = self.0.handlers.lock().unwrap();
        if let Some(l) = handlers.get_mut(topic) {
            let v = l.remove(seq);
            if l.is_empty() {
                infoln!("remove token {}", token);
                handlers.remove(topic);
            }
            v.is_some()
        } else {
            warnln!("topic {} not found!", topic);
            false
        }
    }

    pub fn publish(&self, topic: &str, data: &[u8], _timeout: u32) {
        debugln!("publish topic {}", topic);
        self.0.tx.send(Message::new(topic, data)).map_err(|e| {
            errorln!("send failed:{}", e);
        });
    }

    fn gen_seq(&self) -> String {
        let seq = self.0.token_seq.fetch_add(1, Ordering::SeqCst);
        seq.to_string()
    }

    fn make_token(topic: &str, seq: &str) -> String {
        format!("{}|{}", seq, topic)
    }

    fn parse_token(token: &str) -> Option<(&str, &str)> {
        let items: Vec<&str> = token.split("|").collect();
        if items.len() == 2 {
            Some((items[0], items[1]))
        } else {
            None
        }
    }
}
